[SPARK-25664][SQL][TEST] Refactor JoinBenchmark to use main method#22661
[SPARK-25664][SQL][TEST] Refactor JoinBenchmark to use main method#22661wangyum wants to merge 7 commits intoapache:masterfrom wangyum:SPARK-25664
Conversation
|
Test build #97080 has finished for PR 22661 at commit
|
|
Test build #97090 has finished for PR 22661 at commit
|
| val N = 20 << 20 | ||
| val M = 1 << 16 | ||
|
|
||
| val dim = broadcast(sparkSession.range(M).selectExpr("id as k", "cast(id as string) as v")) |
There was a problem hiding this comment.
So, this is a removal of redundant one, right?
| val dim = broadcast(sparkSession.range(M).selectExpr("cast(id/10 as long) as k")) | ||
| val df = sparkSession.range(N).join(dim, (col("id") % M) === col("k")) | ||
| codegenBenchmark("Join w long duplicated", N) { | ||
| val dim = broadcast(spark.range(M).selectExpr("cast(id/10 as long) as k")) |
There was a problem hiding this comment.
According to another bechmark case in this file, broadcast seems to be put outside of codegenBenchmark. How do you think about this?
| Join w 2 ints: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
| ------------------------------------------------------------------------------------------------ | ||
| Join w 2 ints wholestage off 138514 / 139178 0.2 6604.9 1.0X | ||
| Join w 2 ints wholestage on 129908 / 140869 0.2 6194.5 1.1X |
There was a problem hiding this comment.
Ur, is this correct? Previously, we had the followings.
*Join w 2 ints codegen=false 4426 / 4501 4.7 211.1 1.0X
*Join w 2 ints codegen=true 791 / 818 26.5 37.7 5.6X
There was a problem hiding this comment.
I think it's correct, I ran it on master:
build/sbt "sql/test-only *benchmark.JoinBenchmark"
......
[info] JoinBenchmark:
[info] - broadcast hash join, long key !!! IGNORED !!!
[info] - broadcast hash join, long key with duplicates !!! IGNORED !!!
Running benchmark: Join w 2 ints
Running case: Join w 2 ints wholestage off
Stopped after 2 iterations, 307335 ms
Running case: Join w 2 ints wholestage on
Stopped after 5 iterations, 687107 ms
Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
Join w 2 ints: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Join w 2 ints wholestage off 153532 / 153668 0.1 7321.0 1.0X
Join w 2 ints wholestage on 132075 / 137422 0.2 6297.8 1.2X
There was a problem hiding this comment.
Oh, interesting. Although it's beyond the scope, could you run on branch-2.4 and branch-2.3 please, too?
| val N = 20 << 20 | ||
| val M = 1 << 16 | ||
|
|
||
| val dim = broadcast(spark.range(M).selectExpr("cast(id/10 as long) as k")) |
There was a problem hiding this comment.
For this change, we need rerun the benchmark to get a new result.
|
Test build #97243 has finished for PR 22661 at commit
|
|
Test build #97249 has finished for PR 22661 at commit
|
| import org.apache.spark.sql.types.IntegerType | ||
|
|
||
| /** | ||
| * Benchmark to measure performance for aggregate primitives. |
There was a problem hiding this comment.
aggregate primitives -> joins
| *shuffle hash join codegen=false 2005 / 2010 2.1 478.0 1.0X | ||
| *shuffle hash join codegen=true 1773 / 1792 2.4 422.7 1.1X | ||
| */ | ||
| override def runBenchmarkSuite(): Unit = { |
There was a problem hiding this comment.
Could you wrap the followings(line 168~177) with something like runBenchmark("Join Benchmark")?
|
Test build #97279 has finished for PR 22661 at commit
|
|
retest this please |
|
Test build #97287 has finished for PR 22661 at commit
|
| runBenchmark("merge join", N) { | ||
| val df1 = sparkSession.range(N).selectExpr(s"id * 2 as k1") | ||
| val df2 = sparkSession.range(N).selectExpr(s"id * 3 as k2") | ||
| codegenBenchmark("merge join", N) { |
There was a problem hiding this comment.
merge join -> sort merge join
|
@wangyum . Could you review and merge wangyum#18 ? |
| *------------------------------------------------------------------------------------------- | ||
| *Join w 2 ints codegen=false 4426 / 4501 4.7 211.1 1.0X | ||
| *Join w 2 ints codegen=true 791 / 818 26.5 37.7 5.6X | ||
| */ |
There was a problem hiding this comment.
Hi, @cloud-fan , @gatorsmile , @davies , @rxin .
We are hitting some performance slowdown in benchmark. However, this is not a regression because it's consistent in 2.0.2 ~ 2.4.0-rc3.
Join w 2 ints: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Join w 2 ints wholestage off 157742 / 158892 0.1 7521.7 1.0X
Join w 2 ints wholestage on 134290 / 152917 0.2 6403.4 1.2X
According to the original performance number, it seems to be a result when HashJoin.rewriteKeyExpr uses a simple upcasting to bigint. However, the current code generates a result where HashJoin.rewriteKeyExpr uses shiftleft operations.
scala> val df = spark.range(N).join(dim2, (col("id") % M).cast(IntegerType) === col("k1") && (col("id") % M).cast(IntegerType) === col("k2"))
scala> val df2 = spark.range(N).join(dim2, (col("id") % M) === col("k1") && (col("id") % M) === col("k2"))
scala> df.explain
== Physical Plan ==
*(2) BroadcastHashJoin [cast((id#8L % 65536) as int), cast((id#8L % 65536) as int)], [k1#2, k2#3], Inner, BuildRight
:- *(2) Range (0, 20971520, step=1, splits=8)
+- BroadcastExchange HashedRelationBroadcastMode(List((shiftleft(cast(input[0, int, false] as bigint), 32) | (cast(input[1, int, false] as bigint) & 4294967295))))
+- *(1) Project [cast(id#0L as int) AS k1#2, cast(id#0L as int) AS k2#3, cast(id#0L as string) AS v#4]
+- *(1) Range (0, 65536, step=1, splits=8)
scala> df2.explain
== Physical Plan ==
*(2) BroadcastHashJoin [(id#23L % 65536), (id#23L % 65536)], [cast(k1#2 as bigint), cast(k2#3 as bigint)], Inner, BuildRight
:- *(2) Range (0, 20971520, step=1, splits=8)
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint), cast(input[1, int, false] as bigint)))
+- *(1) Project [cast(id#0L as int) AS k1#2, cast(id#0L as int) AS k2#3, cast(id#0L as string) AS v#4]
+- *(1) Range (0, 65536, step=1, splits=8)
Did we really want to measure the difference in HashJoin.rewriteKeyExpr?
There was a problem hiding this comment.
Any advice is welcome and thank you in advance, @cloud-fan , @gatorsmile , @davies , @rxin .
There was a problem hiding this comment.
This seems caused by the bug fix: #15390
So the performance is reasonable.
| def getProcessorName(): String = { | ||
| val cpu = if (SystemUtils.IS_OS_MAC_OSX) { | ||
| Utils.executeAndGetOutput(Seq("/usr/sbin/sysctl", "-n", "machdep.cpu.brand_string")) | ||
| .stripLineEnd |
There was a problem hiding this comment.
Because the Mac has one more line than Linux:
28f9b9a#diff-45c96c65f7c46bc2d84843a7cb92f22fL7
There was a problem hiding this comment.
Ur.. I'm not a fan to piggy-backing. Okay.
|
Test build #97299 has finished for PR 22661 at commit
|
| Join w 2 ints: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
| ------------------------------------------------------------------------------------------------ | ||
| Join w 2 ints wholestage off 173174 / 173183 0.1 8257.6 1.0X | ||
| Join w 2 ints wholestage on 166350 / 198362 0.1 7932.2 1.0X |
There was a problem hiding this comment.
this surprises me that whole stage codegen doesn't help. We should investigate it later.
|
Test build #97301 has finished for PR 22661 at commit
|
dongjoon-hyun
left a comment
There was a problem hiding this comment.
+1, LGTM. Thank you, @wangyum and @cloud-fan .
Merged to master.
## What changes were proposed in this pull request? Refactor `JoinBenchmark` to use main method. 1. use `spark-submit`: ```console bin/spark-submit --class org.apache.spark.sql.execution.benchmark.JoinBenchmark --jars ./core/target/spark-core_2.11-3.0.0-SNAPSHOT-tests.jar ./sql/catalyst/target/spark-sql_2.11-3.0.0-SNAPSHOT-tests.jar ``` 2. Generate benchmark result: ```console SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain org.apache.spark.sql.execution.benchmark.JoinBenchmark" ``` ## How was this patch tested? manual tests Closes apache#22661 from wangyum/SPARK-25664. Lead-authored-by: Yuming Wang <yumwang@ebay.com> Co-authored-by: Yuming Wang <wgyumg@gmail.com> Co-authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
What changes were proposed in this pull request?
Refactor
JoinBenchmarkto use main method.spark-submit:bin/spark-submit --class org.apache.spark.sql.execution.benchmark.JoinBenchmark --jars ./core/target/spark-core_2.11-3.0.0-SNAPSHOT-tests.jar ./sql/catalyst/target/spark-sql_2.11-3.0.0-SNAPSHOT-tests.jarSPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain org.apache.spark.sql.execution.benchmark.JoinBenchmark"How was this patch tested?
manual tests